libuv 线程池

历程

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <uv.h>
#define FIB_UNTIL 5
uv_loop_t *loop;

long fib_(long t) {
    if (t == 0 || t == 1)
        return 1;
    else
        return fib_(t-1) + fib_(t-2);
}

void fib(uv_work_t *req) {
    int n = *(int *) req->data;
    if (random() % 2)
        sleep(1);
    else
        sleep(3);
    long fib = fib_(n);
    fprintf(stderr, "%dth fibonacci is %lu\n", n, fib);
}

void after_fib(uv_work_t *req, int status) {
    fprintf(stderr, "Done calculating %dth fibonacci\n", *(int *) req->data);
}

int main() {
    loop = uv_default_loop();

    int data[FIB_UNTIL];
    uv_work_t req[FIB_UNTIL];
    int i;
    for (i = 0; i < FIB_UNTIL; i++) {
        data[i] = i;
        req[i].data = (void *) &data[i];
        uv_queue_work(loop, &req[i], fib, after_fib);
    }

    return uv_run(loop, UV_RUN_DEFAULT);
}

参见libuv/src/threadpool.c文件

struct uv__work {
  void (*work)(struct uv__work *w);
  void (*done)(struct uv__work *w, int status);
  struct uv_loop_s* loop;
  void* wq[2];
};

uv__work就代表一个task,可以看到里面有两个函数指针(work代表任务实际操作,done用于对任务进行状态确认)。wq成员就是一个QUEUE的节点, uv__work就是通过wq与其他 uv__work连接成一个队列。

下面来看一下threadpool的初始化,代码如下:

#include "uv-common.h"
#include <stdlib.h>
#define MAX_THREADPOOL_SIZE 1024		// 最大1024

static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond; 					// 条件变量
static uv_mutex_t mutex;				// 互斥锁
static unsigned int idle_threads;
static unsigned int slow_io_work_running;
static unsigned int nthreads;			// 线程池有多少个槽
static uv_thread_t* threads;			// 指向槽头
static uv_thread_t default_threads[4];	// 默认4个
static QUEUE exit_message;
static QUEUE wq;
static QUEUE run_slow_work_message;
static QUEUE slow_io_pending_wq;


static void init_threads(void) {
  unsigned int i;
  const char* val;
  uv_sem_t sem;
  
  /*
   * 1.线程池中的线程数,默认值为4
   */
  nthreads = ARRAY_SIZE(default_threads);
   /*
   * 2.从环境变量中得到UV_THREADPOOL_SIZE值,如果有的话,就更新nthreads值为环境变量的值
   */
  val = getenv("UV_THREADPOOL_SIZE");
  if (val != NULL)
    nthreads = atoi(val);
   
  /*
  *  3.保证线程数量nthreads范围在 (1-MAX_THREADPOOL_SIZE)之间
  */
  if (nthreads == 0)
    nthreads = 1;
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;

 /*
  * 4. 更新线程指针threads,如果nthreads大于默认的4个线程,就重新申请nthreads个内存
  */
  threads = default_threads;
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    if (threads == NULL) {
      nthreads = ARRAY_SIZE(default_threads);
      threads = default_threads;
    }
  }
  /*
  * 5. 初始化条件变量
  */
  if (uv_cond_init(&cond))
    abort();

  /*
  * 6. 初始互斥锁
  */
  if (uv_mutex_init(&mutex))
    abort();

    
  /*
  * 7. 初始化线程池队列头wq 这是个全局变量,还有slow_io_pending_wq,run_slow_work_message
  */
  QUEUE_INIT(&wq);
  QUEUE_INIT(&slow_io_pending_wq);
  QUEUE_INIT(&run_slow_work_message);

 /*
  * 6. 初始话一个局部的信号量sem
  */
  if (uv_sem_init(&sem, 0))
    abort();

  /*
  * 6. 创建nthreads个线程,且回调函数都是worker
  */
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();

  /*
  * 7.等待信号量释放,如果释放掉说明,上面uv_thread_create成功
  */
  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem);

  uv_sem_destroy(&sem);
}

​ 上面的代码中,一共创建了nthreads个线程,那么每个线程的执行代码是什么呢?由线程创建代码:uv_thread_create(threads + i, worker, &sem),可以看到,每一个线程都是执行worker函数,下面看看worker函数都在做什么:

static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;
  int is_slow_work;

  uv_sem_post((uv_sem_t*) arg);
  arg = NULL;
  /*
   *  因为是多线程,所以需要互斥锁 ,这里加锁
   */
  uv_mutex_lock(&mutex);
  for (;;) {
    /* `mutex` should always be locked at this point. */

    /* Keep waiting while either no work is present or only slow I/O
     *   and we're at the threshold for that. 
     *  如果任务队列是空的,或者  (slow I/O 且在临界点)
     *
     */
    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      idle_threads += 1; 			// 空闲线程数加1
      uv_cond_wait(&cond, &mutex);	// 一直在等待条件变量
      idle_threads -= 1;			// 被唤醒之后,说明有任务被post到队列,因此空闲线程数需要减1
    }
    /*
     *  取出第一个task
     */
    q = QUEUE_HEAD(&wq);
    if (q == &exit_message) {
      uv_cond_signal(&cond);
      uv_mutex_unlock(&mutex);
      break;
    }

     // 从队列中移除这个task
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is executing. */

    is_slow_work = 0;
    if (q == &run_slow_work_message) {
      /* If we're at the slow I/O threshold, re-schedule until after all
         other work in the queue is done. */
      if (slow_io_work_running >= slow_work_thread_threshold()) {
        QUEUE_INSERT_TAIL(&wq, q);
        continue;
      }

      /* If we encountered a request to run slow I/O work but there is none
         to run, that means it's cancelled => Start over. */
      if (QUEUE_EMPTY(&slow_io_pending_wq))
        continue;

      is_slow_work = 1;
      slow_io_work_running++;

      q = QUEUE_HEAD(&slow_io_pending_wq);
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);

      /* If there is more slow I/O work, schedule it to be run as well. */
      if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
        QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
        if (idle_threads > 0)
          uv_cond_signal(&cond);
      }
    }
   /*
    *  解锁
    */
    uv_mutex_unlock(&mutex);

   /*
    *  根据节点q取出 uv__work 然后调用自己的回调函数
    */
    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);

    /* Lock `mutex` since that is expected at the start of the next
     * iteration. */
    uv_mutex_lock(&mutex);
    if (is_slow_work) {
      /* `slow_io_work_running` is protected by `mutex`. */
      slow_io_work_running--;
    }
  }
}

可以看到,多个线程都会在worker方法中等待在conn条件变量上,一旦有任务加入队列,线程就会被唤醒,然后只有一个线程会得到任务的执行权,其他的线程只能继续等待。

那么如何向队列提交一个task呢?看以下代码:

 1 void uv__work_submit(uv_loop_t* loop,
 2                      struct uv__work* w,
 3                      void (*work)(struct uv__work* w),
 4                      void (*done)(struct uv__work* w, int status)) {
 5   uv_once(&once, init_once);
 6   // 构造一个task
 7   w->loop = loop;
 8   w->work = work;
 9   w->done = done;
10   // 将其插入任务队列
11   post(&w->wq);
12 }

接着看post做了什么:

static void post(QUEUE* q, enum uv__work_kind kind) {
  // 同步队列操作
  uv_mutex_lock(&mutex);
    
  if (kind == UV__WORK_SLOW_IO) {
    /* Insert into a separate queue. */
    QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
    if (!QUEUE_EMPTY(&run_slow_work_message)) {
      /* Running slow I/O tasks is already scheduled => Nothing to do here.
         The worker that runs said other task will schedule this one as well. */
      uv_mutex_unlock(&mutex);
      return;
    }
    q = &run_slow_work_message;
  }
 // 将task插入队列尾部
  QUEUE_INSERT_TAIL(&wq, q);
  // 如果当前有空闲线程,就向条件变量发送信号
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  uv_mutex_unlock(&mutex);
}

有提交任务,就肯定会有取消一个任务的操作,是的,他就是uv__work_cancel,代码如下:


static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
  int cancelled;

  uv_mutex_lock(&mutex);
  uv_mutex_lock(&w->loop->wq_mutex);
// 只有当前队列不为空并且要取消的uv__work有效时才会继续执行
  cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
  if (cancelled)
    QUEUE_REMOVE(&w->wq);;// 从队列中移除task

  uv_mutex_unlock(&w->loop->wq_mutex);
  uv_mutex_unlock(&mutex);

  if (!cancelled)
    return UV_EBUSY;
// 更新这个task的状态
  w->work = uv__cancelled;
  uv_mutex_lock(&loop->wq_mutex);
  QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
  uv_async_send(&loop->wq_async);
  uv_mutex_unlock(&loop->wq_mutex);

  return 0;
}

至此,一个线程池的组成以及实现原理都说完了,可以看到,libuv几乎是用了最少的代码完成了高效的线程池。

线程池中关于等待信号量知识点,代码如下:

static void worker(void* arg) {
   /**************此处省略N行************/
    
	uv_mutex_lock(&mutex);
  for (;;) {
    /* `mutex` should always be locked at this point. */

    /* Keep waiting while either no work is present or only slow I/O
     *   and we're at the threshold for that. 
     *  如果任务队列是空的,或者  (slow I/O 且在临界点)
     *
     */
    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      idle_threads += 1; 			// 空闲线程数加1
      uv_cond_wait(&cond, &mutex);	// 一直在等待条件变量
      idle_threads -= 1;			// 被唤醒之后,说明有任务被post到队列,因此空闲线程数需要减1
    }
      
    /**************此处省略N行************/
 }
pthread_cond_wait(&cond, &mutex);

以及为什么可以使用while和if。首先解释第一点,有两个方面,线程在执行的部分访问的是进程的资源,有可能有多个线程需要访问它,为了避免由于线程并发执行所引起的资源竞争,所以要让每个线程互斥的访问公有资源,但是细心一下就会发现,如果while或者if判断的时候,不满足线程的执行条件,那么线程便会调用pthread_cond_wait阻塞自己,但是它持有的锁怎么办呢,如果他不归还操作系统,那么其他线程将会无法访问公有资源。这就要追究一下pthread_cond_wait的内部实现机制,当pthread_cond_wait被调用线程阻塞的时候,pthread_cond_wait会自动释放互斥锁。释放互斥锁的时机是什么呢:是线程从调用pthread_cond_wait到操作系统把他放在线程等待队列之后,这样做有一个很重要的原因,就是mutex的第二个作用,保护条件。想一想,线程是并发执行的,如果在没有把被阻塞的线程A放在等待队列之前,就释放了互斥锁,这就意味着其他线程比如线程B可以获得互斥锁去访问公有资源,这时候线程A所等待的条件改变了,但是它没有被放在等待队列上,导致A忽略了等待条件被满足的信号。倘若在线程A调用pthread_cond_wait开始,到把A放在等待队列的过程中,都持有互斥锁,其他线程无法得到互斥锁,就不能改变公有资源。这就保证了线程A被放在等待队列上之后才会有公有资源被改变的信号传递给等待队列。

接下来讲解使用while和if判断线程执行条件是否成立的区别。一般来说,在多线程资源竞争的时候,在一个使用资源的线程里面(消费者)判断资源是否可用,不可用便调用pthread_cond_wait,在另一个线程里面(生产者)如果判断资源可用的话,则调用pthread_cond_signal发送一个资源可用信号。但是在wait成功之后,资源就一定可以被使用么,答案是否定的,如果同时有两个或者两个以上的线程正在等待此资源,wait返回后,资源可能已经被使用了,在这种情况下,应该使用:

while(resource == FALSE)

  pthread_cond_wait(&cond, &mutex);

如果之后一个消费者,那么使用if就可以了。解释一下原因,分解pthread_cond_wait的动作为以下几步:

1,线程放在等待队列上,解锁

2,等待 pthread_cond_signal或者pthread_cond_broadcast信号之后去竞争锁

3,若竞争到互斥索则加锁。

上面讲到,有可能多个线程在等待这个资源可用的信号,信号发出后只有一个资源可用,但是有A,B两个线程都在等待,B比较速度快,获得互斥锁,然后加锁,消耗资源,然后解锁,之后A获得互斥锁,但他回去发现资源已经被使用了,它便有两个选择,一个是去访问不存在的资源,另一个就是继续等待,那么继续等待下去的条件就是使用while,要不然使用if的话pthread_cond_wait返回后,就会顺序执行下去。

下面来讲一下:pthread_cond_wait和pthread_cond_singal是怎样配对使用的:

 等待线程:

 pthread_cond_wait前要先加锁
 pthread_cond_wait内部会解锁,然后等待条件变量被其它线程激活
 pthread_cond_wait被激活后会再自动加锁
 激活线程:
 加锁(和等待线程用同一个锁)
 pthread_cond_signal发送信号
 解锁
 激活线程的上面三个操作在运行时间上都在等待线程的pthread_cond_wait函数内部。